package opmq

import (
	"context"
	"gitee.com/maomaomaoge/opmq/grpc_proto"
	"google.golang.org/grpc"
)

type ClusterNet struct {
	// 集群本身节点对外ip
	Ip      string `json:"ip"`
	// grpc端口号
	RpcPort string `json:"rpc_port"`
}

type ClusterSelf struct {
	// 节点唯一id
	Id string `json:"id"`

	ClusterNet *ClusterNet

	// 主节点标示
	IsMaster bool `json:"is_master"`

	// 如果当前是从节点，远程主节点Ip,对应的是master Mqtt地址
	RemoteMasterIp string `json:"remote_master_ip"`
	MasterRocPort  string `json:"master_roc_port"`

	Nodes []*ClusterNode

	Job chan *ClusterMessage
}

func NewClusterSelf() *ClusterSelf {
	cs := &ClusterSelf{
		Nodes:      make([]*ClusterNode, 0),
		Job:        make(chan *ClusterMessage, 1),
		ClusterNet: &ClusterNet{},
	}

	go cs.run()
	return cs
}

func (cs *ClusterSelf) Submit(job *ClusterMessage) {
	cs.Job <- job
}

func (cs *ClusterSelf) run() {
	for v := range cs.Job {
		go cs.accept(v)
	}
}

// 接受消息，发送
// 找到列表中的节点哪个有这条消息
func (cs *ClusterSelf) accept(msg *ClusterMessage) {
	//CLUSTER_DEBUG.Println("需要转发的任务: ", msg.Topic, " 当前节点的存储的节点数量: ", len(cs.Nodes))

	for _, v := range cs.Nodes {
		//CLUSTER_DEBUG.Println(v.Id, " topic: ", v.Topics)
		for _, v2 := range v.Topics {
			if v2 == msg.Topic {
				v.Send(msg.Topic, msg.PayLoad)
			}
		}
	}
}

// NodeJoin 节点添加
// 检查存在情况，进行更新
func (cs *ClusterSelf) NodeJoin(ip string, port string, id string, topic []string) {
	nodes := make([]*ClusterNode, 0)
	for _, v := range cs.Nodes {
		if v.Id != id {
			DEBUG.Println("id校验: ", v.Id, id)
			nodes = append(nodes, v)
		}
	}

	node := NewClusterNode(id, ip, port, topic)
	nodes = append(nodes, node)
	cs.Nodes = nodes
}

func (cs *ClusterSelf) PushAndPull() {
	if cs.IsMaster {
		return
	}

	conn, err := grpc.Dial(cs.RemoteMasterIp+":"+cs.MasterRocPort, grpc.WithInsecure())
	if err != nil {
		PANIC.Println("和主节点建立连接失败")
		return
	}

	client := grpc_proto.NewOpmqRpcClient(conn)
	client.Push(context.Background(), &grpc_proto.ClusterNode{
		Id:    cs.Id,
		Ip:    cs.ClusterNet.Ip,
		Port:  Broker.MqttPort,
		Topic: Broker.GetUniqueSublist(),
	})

	pull, err := client.Pull(context.Background(), &grpc_proto.Null{})
	if err != nil {
		return
	}

	for _, v := range pull.Data {
		cs.NodeJoin(v.Ip, v.Port, v.Id, v.Topic)
	}
}
